/* 简介：cothread 是一个轻量级协程调度器，由纯C语言实现，易于移植到各种单片机。
 * 同时，由于该调度器仅仅运行在一个实际线程中，所以它也适用于服务器高并发场景。
 *
 * 版本: 1.0.0   2019/02/25
 *
 * 作者: 覃攀 <qinpan1003@qq.com>
 *
 */

#include "rtos.h"

mqd_t mq_open(const char *mq_name, int maxmsgs, int max_permsg_bytes, int max_total_bytes)
{
    mqd_t q = malloc(sizeof(struct mq_des));
    if (q == NULL)
    {
        LOG("mq open failed %s \n.", mq_name);
        return NULL;
    }
    memset(q, 0, sizeof(struct mq_des));

    memcpy(q->name, mq_name, min(strlen(mq_name), MQ_NAME_SIZE));
    q->name[MQ_NAME_SIZE - 1] = 0;

    q->maxmsgs = maxmsgs;
    q->max_permsg_bytes = max_permsg_bytes;
    q->max_total_bytes = max_total_bytes;
    
    return q;
}

static void mq_add_wait_quene(mqd_t mqdes, ccb_t *ccb)
{
    struct mq_wait *wait = malloc(sizeof(struct mq_wait));
    if (wait == NULL)
    {
        LOG_ERR("mq_add_wait_quene malloc failed.\n");
        return;
    }

    memset(wait, 0, sizeof(struct mq_wait));
    wait->thread = ccb;

    if (mqdes->wait_quene == NULL)
    {
        wait->next = wait;
        wait->prev = wait;
        mqdes->wait_quene = wait;
    }
    else
    {
        /* 放入链表尾 */
        wait->next = mqdes->wait_quene;
        wait->prev = mqdes->wait_quene->prev;
        mqdes->wait_quene->prev->next = wait;
        mqdes->wait_quene->prev = wait;
    }
}

int __mq_receive(mqd_t mqdes, char *msg, int msglen, unsigned int *prio)
{
    int length = 0;
    struct mq_msg *msgq = mqdes->msg_tail;

    if (msgq == NULL)
    {
        mq_add_wait_quene(mqdes, current_thread);
        return 0;
    }

    if (mqdes->msg_tail == mqdes->msg_head)
        mqdes->msg_head = NULL;
    mqdes->msg_tail = msgq->next;
    
    length = msgq->length;
    if (length > msglen)
        length = msglen;

    mqdes->current_msgs--;
    mqdes->current_bytes -= msgq->length;

    memcpy(msg, msgq->buff, length);

    free(msgq->buff);
    free(msgq);
    return length;
}

int mq_send(mqd_t mqdes, const char *msg, int msglen, unsigned int prio)
{
    int current_msgs = mqdes->current_msgs + 1;
    int current_bytes = mqdes->current_bytes + msglen;
    struct mq_msg *msgq;

    if (current_msgs > mqdes->maxmsgs)
        return -1;

    if (current_bytes > mqdes->max_total_bytes)
        return -2;

    if (msglen > mqdes->max_permsg_bytes)
        return -3;

    mqdes->current_msgs = current_msgs;
    mqdes->current_bytes = current_bytes;

    msgq = malloc(sizeof(struct mq_msg));
    if (msgq == NULL)
        return -4;
    memset(msgq, 0, sizeof(struct mq_msg));
    
    msgq->buff = malloc(msglen);
    if (msgq->buff == NULL)
    {
        free(msgq);
        return -5;
    }
    memcpy(msgq->buff, msg, msglen);
    
    msgq->length = msglen;
    msgq->next = NULL;

    /* 节点放入链表头 */
    if (mqdes->msg_head)
        mqdes->msg_head->next = msgq;
    mqdes->msg_head = msgq;

    if (mqdes->msg_tail == NULL)
        mqdes->msg_tail = msgq;

    /* 如果有消费者等待，则唤醒第一个消费者 */
    if (mqdes->wait_quene != NULL)
    {
        struct mq_wait *wait_head = mqdes->wait_quene;
        thread_signal(wait_head->thread, OsEvent(EVENT_MQ_ARRIVE));
        
        if (wait_head->next == wait_head)
            mqdes->wait_quene = NULL;
        else
        {
            wait_head->next->prev = wait_head->prev;
            wait_head->prev->next = wait_head->next;
            mqdes->wait_quene = wait_head->next;
        }

        free(wait_head);
    }

    return 0;
}

int mq_load_warning(mqd_t mqdes)
{
    return (mqdes->current_msgs > (mqdes->maxmsgs / 2) 
            || mqdes->current_bytes > (mqdes->max_total_bytes / 2));
}

